package org.databandtech.mysql2kafka;

import java.util.Properties;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.databandtech.mysql2kafka.source.SourceFromMySQLByTuple5;
import org.apache.flink.api.common.functions.MapFunction;

public class MysqlToKafka {

	// 数据库相关
	final static String URL = "jdbc:mysql://127.0.0.1:3306/databand?useUnicode=true&characterEncoding=utf-8&useSSL=false";
	final static String USER = "root";
	final static String PASS = "mysql";
	final static String SQL = "SELECT title,status,vid,cover_id,url from databand_video ORDER BY id DESC LIMIT 100";
	final static String[] COLUMNS_READ = new String[] { "title", "status", "vid", "cover_id", "url" };
	
	final static String SINK_TOPIC = "Hello-Kafka";


	public static void main(String[] args) {

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000));
        DataStreamSource<Tuple5<String,Integer,String,String,String>> streamSource = env.addSource(
				new SourceFromMySQLByTuple5(URL,USER,PASS,COLUMNS_READ,SQL));
		
        streamSource.print();
        SingleOutputStreamOperator<String> dataStream = streamSource.map(new MapTransformation());
        
      //数据写入
  		Properties propertiesSink = new Properties();
  		
  		propertiesSink.setProperty("bootstrap.servers", "192.168.10.60:9092");
  		propertiesSink.setProperty("stream.checkpoint.interval", "5000");
  		propertiesSink.setProperty("stream.sink.parallelism", "2");

  		SinkFunction<String> myProducer = new FlinkKafkaProducer(SINK_TOPIC,new SimpleStringSchema(),propertiesSink);   // 序列化 schema
  		
  		dataStream.addSink(myProducer);
  		
  		try {
  			env.execute("ok-kafka");
  		} catch (Exception e) {
  			e.printStackTrace();
  		}
	}
	
	public static class MapTransformation implements MapFunction<Tuple5<String,Integer,String,String,String>,String> {

		public String map(Tuple5<String,Integer,String,String,String> in) throws Exception {
			String result = in.f0 +"-"+in.f1 +"-"+in.f2 +"-"+in.f3 +"-"+in.f4 ;
			return result;
		}
	}
}
